-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[observability][export-api] Write submission job events #47468
[observability][export-api] Write submission job events #47468
Conversation
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
…ita/write_job_submission_events
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
ExportEventDataType = Union[ExportSubmissionJobEventData] | ||
|
||
|
||
def get_event_id(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
def get_event_id(): | |
def generate_event_id(): |
# Force flush so that we won't lose events | ||
self.logger.handlers[0].flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a pattern we follow in the other event exporters as well?
I think batching flushing is reasonable if we get significant performance improvements at the loss of some reliability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it is only used by a job, it is unlikely cause a problem, but I agree it is probably okay to lose some events here... (I don't think we guarantee flush every event after it is finished)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I added this behavior for consistency with the EventLoggerAdapter
for existing python events
ray/python/ray/_private/event/event_logger.py
Line 105 in d8a85c5
self.logger.handlers[0].flush() |
LogEventReporter
for C++ events also flushes after each event by default Line 103 in d8a85c5
bool force_flush = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The export events from the python layer are unlikely to be created with very high volume (unlike tasks), so we can probably just keep this flush logic for now for consistency and optimize later if needed.
_export_event_logger = {} | ||
|
||
|
||
def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you check how we do structured logging in python layer and see if we should follow similar patterns?
Do they also use a singleton pattern like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Structured logging fetches various registered loggers for the different source types, but the existing python events use this singleton pattern. We want to use a single logger to avoid any concurrency issues from multiple threads. Using register logger is an option which also is a singleton, but I think this API is cleaner because there is no separate initialization and get functions.
Test submission job events are correctly generated and written to file | ||
as the job goes through various state changes in its lifecycle. | ||
""" | ||
ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this screws up the global variable for the rest of the tests. you should probably create a fixture if you want to do this. (or alternatively, just update os.environ)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to set the environment variable in addition to passing the env var when calling ray start
, because the constant needs to be set outside ray tasks also when submitting a job.
filepath.touch(exist_ok=True) | ||
# Configure the logger. | ||
handler = logging.handlers.RotatingFileHandler( | ||
filepath, maxBytes=(100 * 1e6), backupCount=20 # 100 MB max file size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it configurable via env var?
event = ExportEvent() | ||
event.event_id = get_event_id() | ||
event.timestamp = int(datetime.now().timestamp()) | ||
if type(event_data) is ExportSubmissionJobEventData: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if type(event_data) is ExportSubmissionJobEventData: | |
if isinstance(event_data, ExportSubmissionJobEventData): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
# Force flush so that we won't lose events | ||
self.logger.handlers[0].flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it is only used by a job, it is unlikely cause a problem, but I agree it is probably okay to lose some events here... (I don't think we guarantee flush every event after it is finished)
self._gcs_aio_client = gcs_aio_client | ||
self._export_submission_job_event_logger = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a type!
@@ -189,8 +197,25 @@ class JobInfoStorageClient: | |||
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" | |||
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" | |||
|
|||
def __init__(self, gcs_aio_client: GcsAioClient): | |||
def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not allow log_dir = None case?
def __init__(self, gcs_aio_client: GcsAioClient, log_dir: str):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept log_dir as an optional variable because the JobInfoStorageClient is called from quite a few places to get job info (no updates to the KV store so no export events are emitted). These callers don’t have a log_dir, so unless we have a global log dir that can be fetched making this variable non optional requires a refactor throughout the jobs code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename the variable to export_event_log_dir_root? Something where it's clearly meant to control one specific thing.
if added_num == 1 or overwrite: | ||
# Write export event if data was updated in the KV store | ||
try: | ||
self._write_submission_job_export_event(job_id, job_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: we don't write any event for failure cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't expected to fail, but wrapped in a try catch so the remaining job code isn't affected in any way. If there is a failure though, we should log in the normal job log file and not export event log files because the export event files may be ingested by some consumer.
job_info.status.name | ||
) | ||
) | ||
if status_value_descriptor is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when does it happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a new JobInfo status is added to the python model without updating the proto. We do have a comment to keep these in sync though, so this should be unlikely.
@@ -46,6 +48,59 @@ | |||
import psutil | |||
|
|||
|
|||
@pytest.mark.asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create a new test suite for all export events? I assume we will need more e2e testing eventually in python layers, and it'd be nice to put them in the same file.
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
@@ -189,8 +197,25 @@ class JobInfoStorageClient: | |||
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" | |||
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" | |||
|
|||
def __init__(self, gcs_aio_client: GcsAioClient): | |||
def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename the variable to export_event_log_dir_root? Something where it's clearly meant to control one specific thing.
Signed-off-by: Nikita Vemuri <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
…47468) Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe. Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store. Signed-off-by: ujjawal-khare <[email protected]>
Why are these changes needed?
ExportEventLoggerAdapter
which will be used to write export events to file from python files. Only a singleExportEventLoggerAdapter
instance will exist per source type, so callers can create or get this instance usingget_export_event_logger
which is thread safe.JobInfoStorageClient.put_info
which is called to update theJobInfo
data in the internal KV store.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.